# 示例13 - 集群化的Quartz

说明

此作业演示如何在集群环境中使用Quartz,以及Quartz如何使用数据库持久化调度信息。

# SimpleRecoveryJob.java源码

package org.quartz.examples.example13;

import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;

/**
 * Job的一个缺陷实现,用于单元测试
 * 作者:James House
 */
public class SimpleRecoveryJob implements Job {

  private static Logger _log = LoggerFactory.getLogger(SimpleRecoveryJob.class);

  private static final String COUNT = "count";

  /**
   * Quartz需要一个公共的空参构造函数,以便调度器可以在需要时实例化类
   */
  public SimpleRecoveryJob() {
  }

  /**
   * 当与此作业相关联的触发器触发时,由调度器调用此方法
   * Throws:作业执行异常(JobExecutionException)-当执行作业时产生异常
   */
  public void execute(JobExecutionContext context) throws JobExecutionException {

    JobKey jobKey = context.getJobDetail().getKey();

    //如果作业正在恢复,则打印消息
    if (context.isRecovering()) {
      _log.info("SimpleRecoveryJob: " + jobKey + " RECOVERING at " + new Date());
    } else {
      _log.info("SimpleRecoveryJob: " + jobKey + " starting at " + new Date());
    }

    //延迟10秒
    long delay = 10L * 1000L;
    try {
      Thread.sleep(delay);
    } catch (Exception e) {
      //
    }

    JobDataMap data = context.getJobDetail().getJobDataMap();
    int count;
    if (data.containsKey(COUNT)) {
      count = data.getInt(COUNT);
    } else {
      count = 0;
    }
    count++;
    data.put(COUNT, count);

    _log.info("SimpleRecoveryJob: " + jobKey + " done at " + new Date() + "\n Execution #" + count);

  }

}

# SimpleRecoveryStatefulJob.java源码

package org.quartz.examples.example13;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.PersistJobDataAfterExecution;

/**
 * 此作业具有与SimpleRecoveryJob相同的功能,只是此作业实现的是“有状态”的,因为它将在每次执行后自动重新持久化其数据  
 *(JobDataMap),并且一次只能执行JobDetail的一个实例
 * 作者:Bill Kratzer
 */
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class SimpleRecoveryStatefulJob extends SimpleRecoveryJob {

  public SimpleRecoveryStatefulJob() {
    super();
  }
}

# ClusterExample.java源码

package org.quartz.examples.example13;

import static org.quartz.DateBuilder.futureDate;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;
import org.quartz.DateBuilder.IntervalUnit;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleTrigger;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 用于测试/显示JDBCJobStore(JobStoreTX或JobStoreCMT)的群集功能
 * 所有实例都必须使用不同的属性文件,因为它们的实例ID必须不同,但所有其他属性都应该相同
 * 如果您希望它清除现有的作业和触发器,请传递一个名为“clearJobs”的命令行参数
 * 您可能应该从一组“新的”表开始(假设您可能有来自其他测试的数据遗留在其中),因为将非集群设置的数据与集群设置的混合可能会很糟糕
 * 尝试在运行时杀死其中一个群集实例,并查看其余实例是否恢复正在进行的作业。注意,在默认设置下,检测故障可能需要15秒左右的时间
 * 也可以尝试在调度程序中注册/不注册关机挂钩插件的情况下运行它。(org.quartz.plugins.management.ShutdownHookPlugin)
 * 注意:不要在单独的机器上运行集群,除非它们的时钟使用某种形式的时间同步服务(如NTP守护程序)进行同步
 * 
 * 参考:SimpleRecoveryJob
 * 参考:SimpleRecoveryStatefulJob
 * 作者:James House
 */
public class ClusterExample {

  private static Logger _log = LoggerFactory.getLogger(ClusterExample.class);

  public void run(boolean inClearJobs, boolean inScheduleJobs) throws Exception {

    //首先,我们必须获得对调度器的引用
    SchedulerFactory sf = new StdSchedulerFactory();
    Scheduler sched = sf.getScheduler();

    if (inClearJobs) {
      _log.warn("***** Deleting existing jobs/triggers *****");
      sched.clear();
    }

    _log.info("------- Initialization Complete -----------");

    if (inScheduleJobs) {

      _log.info("------- Scheduling Jobs ------------------");

      String schedId = sched.getSchedulerInstanceId();

      int count = 1;

      //将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
      //在调度程序停止时重新执行此作业的位置
      JobDetail job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId) 
          .requestRecovery()
          .build();

      SimpleTrigger trigger = newTrigger().withIdentity("triger_" + count, schedId)
          .startAt(futureDate(1, IntervalUnit.SECOND))
          .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(5)).build();

      _log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
                + trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
      sched.scheduleJob(job, trigger);

      count++;
		
      //将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
      //在调度程序停止时重新执行此作业的位置
      job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId) 
          .requestRecovery()
          .build();

      trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(2, IntervalUnit.SECOND))
          .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(5)).build();

      _log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
                + trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
      sched.scheduleJob(job, trigger);

      count++;

      //将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
      //在调度程序停止时重新执行此作业的位置
      job = newJob(SimpleRecoveryStatefulJob.class).withIdentity("job_" + count, schedId) 
          .requestRecovery()
          .build();

      trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
          .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(3)).build();

      _log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
                + trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
      sched.scheduleJob(job, trigger);

      count++;

      //将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
      //在调度程序停止时重新执行此作业的位置
      job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
          .requestRecovery()
          .build();

      trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
          .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(4)).build();

      _log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " & repeat: " + trigger.getRepeatCount()
                + "/" + trigger.getRepeatInterval());
      sched.scheduleJob(job, trigger);

      count++;

      //将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
      //在调度程序停止时重新执行此作业的位置
      job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
          .requestRecovery()
          .build();

      trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
          .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInMilliseconds(4500L)).build();

      _log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " & repeat: " + trigger.getRepeatCount()
                + "/" + trigger.getRepeatInterval());
      sched.scheduleJob(job, trigger);
    }

    //在调用start()之前,作业不会启动
    _log.info("------- Starting Scheduler ---------------");
    sched.start();
    _log.info("------- Started Scheduler ----------------");

    _log.info("------- Waiting for one hour... ----------");
    try {
      Thread.sleep(3600L * 1000L);
    } catch (Exception e) {
      //
    }

    _log.info("------- Shutting Down --------------------");
    sched.shutdown();
    _log.info("------- Shutdown Complete ----------------");
  }

  public static void main(String[] args) throws Exception {
    boolean clearJobs = false;
    boolean scheduleJobs = true;

    for (String arg : args) {
      if (arg.equalsIgnoreCase("clearJobs")) {
        clearJobs = true;
      } else if (arg.equalsIgnoreCase("dontScheduleJobs")) {
        scheduleJobs = false;
      }
    }

    ClusterExample example = new ClusterExample();
    example.run(clearJobs, scheduleJobs);
  }
}

# 集群配置

此示例需要数据库支持。示例使用 PostgreSQL 作为数据库,并通过 JDBC JobStore 实现集群。

# instance1.properties

实例1的配置文件,实例ID为 instance_one

#============================================================================
# 配置主调度器属性
#============================================================================

org.quartz.scheduler.instanceName: TestScheduler
org.quartz.scheduler.instanceId: instance_one

#============================================================================
# 配置线程池
#============================================================================

org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 5
org.quartz.threadPool.threadPriority: 5

#============================================================================
# 配置JobStore
#============================================================================

org.quartz.jobStore.misfireThreshold: 60000

org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.dataSource=myDS
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true

#============================================================================
# 配置数据源
#============================================================================

org.quartz.dataSource.myDS.driver: org.postgresql.Driver
org.quartz.dataSource.myDS.URL: jdbc:postgresql://localhost:5432/quartz
org.quartz.dataSource.myDS.user: quartz
org.quartz.dataSource.myDS.password: quartz
org.quartz.dataSource.myDS.maxConnections: 5
org.quartz.dataSource.myDS.validationQuery: select 0

# instance2.properties

实例2的配置文件,实例ID为 instance_two,其余配置与实例1完全相同:

#============================================================================
# 配置主调度器属性
#============================================================================

org.quartz.scheduler.instanceName: TestScheduler
org.quartz.scheduler.instanceId: instance_two

#============================================================================
# 配置线程池
#============================================================================

org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 5
org.quartz.threadPool.threadPriority: 5

#============================================================================
# 配置JobStore
#============================================================================

org.quartz.jobStore.misfireThreshold: 60000

org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.dataSource=myDS
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true

#============================================================================
# 配置数据源
#============================================================================

org.quartz.dataSource.myDS.driver: org.postgresql.Driver
org.quartz.dataSource.myDS.URL: jdbc:postgresql://localhost:5432/quartz
org.quartz.dataSource.myDS.user: quartz
org.quartz.dataSource.myDS.password: quartz
org.quartz.dataSource.myDS.maxConnections: 5
org.quartz.dataSource.myDS.validationQuery: select 0

重要提示

两个实例的配置文件中,org.quartz.scheduler.instanceId 必须不同,但所有其他属性应保持一致。不要在时钟未同步的独立机器上运行集群,应使用时间同步服务(如NTP守护程序)来同步各机器时钟。

# 运行示例

# 前置准备

  1. 安装 PostgreSQL 数据库,创建名为 quartz 的数据库和用户
  2. 执行 Quartz 发行包中提供的 SQL 表创建脚本(位于 docs/dbTables/ 目录),在数据库中创建 Quartz 所需的表
  3. 下载 PostgreSQL JDBC 驱动(可从 http://jdbc.postgresql.org 获取),将 jar 文件放入 Quartz 发行包的 lib 文件夹中
  4. 根据实际环境修改 instance1.propertiesinstance2.properties 中的数据库连接信息

# 启动步骤

Windows 用户:

  1. 如有必要,修改 instance1.batinstance2.bat,设置 JAVA_HOMEJDBC_CP
  2. 运行 instance1.bat(可在启动时传入 clearJobs 参数清除数据库中已有的作业和触发器)
  3. 第一个实例启动后,运行 instance2.bat(脚本会自动传入 dontScheduleJobs 参数,避免重复调度)

UNIX/Linux 用户:

  1. 如有必要,修改 instance1.shinstance2.sh,设置 JAVA_HOMEJDBC_CP
  2. 执行 instance1.sh
  3. 第一个实例启动后,执行 instance2.sh

故障转移测试

尝试在运行时终止其中一个集群实例,观察其余实例是否接管正在进行的作业。注意,在默认设置下,检测故障可能需要约15秒的时间。


微信公众号

QQ交流群
原创网站开发,偏差难以避免。

如若发现错误,诚心感谢反馈。

愿你倾心相念,愿你学有所成。

愿你朝华相顾,愿你前程似锦。